RAGできるSlackチャットボットを作ってみた
こんにちは。たにもんです。
生成AIを活用したアプリケーションの代表例としてRAG (Retrieval-Augmented Generation; 検索拡張生成) があります。 LLMが生成する文章にはもっともらしい嘘(ハルシネーション)が含まれることがありますが、RAGを用いることでハルシネーションを抑える効果が期待できます。
ハルシネーションはLLMが学習していない知識に関する文章を生成する際に発生する可能性が高まりますが、RAGではユーザーの入力に関連する情報を外部から検索してLLMの知識を補ってあげることで精度向上を目指します。
今回はRAGを実行できるSlackチャットボットを作ってみたので紹介します。 実際に動かしてみた様子は以下のとおりです。LLMが学習していないであろう 生成AI環境構築サービス「AI-Starter」 について正しく説明してくれています。
今回作成したコード全体は以下のGitHubリポジトリで公開しているので、興味のある方はご参照ください。
https://github.com/tanimon/ai-chatbot-slack/tree/v2024.06.13
システム構成
Slack Appの作成
まずはインターフェースとなるSlack App(ボット)を作成しましょう。 作成方法の具体的な手順は以下のブログが参考になるかと思います。
[Slack API] Bolt for JavaScript を使用した Getting Started を試してみた | DevelopersIO
今回は以下のようなマニフェストを利用してSlack Appを作成しました。
display_information: name: AIチャットボット description: RAGができるAIチャットボット background_color: "#000000" features: bot_user: display_name: ai-chatbot always_online: false oauth_config: scopes: bot: - app_mentions:read - chat:write - channels:history - groups:history - im:history settings: event_subscriptions: request_url: https://xxxxxxxxxx.ap-northeast-1.awsapprunner.com/slack/events bot_events: - app_mention org_deploy_enabled: false socket_mode_enabled: false token_rotation_enabled: false
アプリの作成が完了したら、以下の認証情報を控えておいてください。 CDKデプロイ時にパラメータとして設定する必要があります。
- Basic Information > App Credentialsに記載されているSigning Secret
- OAuth & Permissions > OAuth Tokens for Your Workspaceに記載されているBot User OAuth Token
アプリケーションの実装
次はSlack Appのロジックを実装しましょう。 今回は Slack Bolt を用いて次のようなコードを書きました。
import logging import os import re from slack_bolt import App, Say from server.rag import llm_chain, rag_chain logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG) # ボットトークンと署名シークレットを使ってアプリを初期化する app = App( token=os.environ.get("SLACK_BOT_TOKEN"), signing_secret=os.environ.get("SLACK_SIGNING_SECRET"), ) # ボットへのメンションに対するイベントリスナー @app.event("app_mention") def handle_app_mention(event, say: Say, logger: logging.Logger): logger.debug(f"app_mention event: {event}") text = event["text"] channel = event["channel"] thread_ts = event.get("thread_ts") or event["ts"] say(channel=channel, thread_ts=thread_ts, text="考え中です...少々お待ちください...") payload = remove_mention(text) logger.debug(f"payload: {payload}") result = ( rag_chain.invoke(payload) if is_rag_enabled() else llm_chain.invoke(payload) ) logger.debug(f"result: {result}") say(channel=channel, thread_ts=thread_ts, text=result) @app.error def handle_error(error, event, say: Say, logger: logging.Logger): logger.exception(f"エラーが発生しました: {error}") channel = event["channel"] thread_ts = event.get("thread_ts") or event["ts"] say(channel=channel, thread_ts=thread_ts, text=f"エラーが発生しました: {error}") def remove_mention(text: str) -> str: """メンションを除去する""" mention_regex = r"<@.*>" return re.sub(mention_regex, "", text).strip() def is_rag_enabled() -> bool: """RAGが有効かどうかを返す""" return os.environ.get("RAG_ENABLED", "false").lower() == "true" app.start(port=int(os.environ.get("PORT", 3000)))
今回はRAGを利用するか素のLLMを呼び出すかを環境変数から設定できるようにしました。 以下の部分で環境変数の値に応じてRAGもしくはLLMを利用して回答を生成し、その結果をSlackのスレッドに返信しています。
result = ( rag_chain.invoke(payload) if is_rag_enabled() else llm_chain.invoke(payload) ) logger.debug(f"result: {result}") say(channel=channel, thread_ts=thread_ts, text=result)
RAGは以下に示したコードのとおり、LangChainを用いて実装しています。 今回は次のような構成でRAGを構築しました。
- ユーザー入力の埋め込み作成に用いるモデル: Amazon Titan Text Embeddings V2
- ベクトルDB: OpenSearch Serverless
- 回答生成モデル: Claude 3 Haiku (Bedrock経由)
import os import boto3 from langchain import hub from langchain_aws import BedrockEmbeddings from langchain_aws.chat_models import ChatBedrock from langchain_community.vectorstores import OpenSearchVectorSearch from langchain_core.documents import Document from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import Runnable, RunnablePassthrough from opensearchpy import RequestsHttpConnection from requests_aws4auth import AWS4Auth # type: ignore embedding = BedrockEmbeddings( model_id="amazon.titan-embed-text-v2:0", region_name="us-east-1", client=None ) credentials = boto3.Session().get_credentials() aws_auth = AWS4Auth( refreshable_credentials=credentials, region="ap-northeast-1", service="aoss", ) vectorstore = OpenSearchVectorSearch( opensearch_url=os.environ["AOSS_ENDPOINT_URL"], index_name=os.environ["AOSS_INDEX_NAME"], embedding_function=embedding, http_auth=aws_auth, timeout=300, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, engine="faiss", ) retriever = vectorstore.as_retriever() def format_docs(docs: list[Document]) -> str: return "\n\n".join([doc.page_content for doc in docs]) prompt = hub.pull("rlm/rag-prompt") llm = ChatBedrock( model_id="anthropic.claude-3-haiku-20240307-v1:0", region_name="us-east-1", client=None, model_kwargs={ "temperature": 0, }, ) rag_chain: Runnable = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) llm_chain: Runnable = llm | StrOutputParser()
プロンプトに関してはLangChain Hubから取得したrlm/rag-promptを利用しました。 こちらのプロンプトの中身は以下のようになっています。
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise. Question: {question} Context: {context} Answer:
インフラをCDKで構築
インフラを以下のCKDコードで構築しました。 OpenSearch Serverlessコレクション、App Runnerサービスおよびそれらに紐付くリソースを作成しています。
import * as apprunner from "@aws-cdk/aws-apprunner-alpha"; import * as cdk from "aws-cdk-lib"; import { Construct } from "constructs"; export class MainStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const knowledgeBaseCollection = new cdk.aws_opensearchserverless.CfnCollection( this, "KnowledgeBaseCollection", { name: "knowledge-base", type: "VECTORSEARCH", standbyReplicas: "DISABLED", }, ); const knowledgeBaseCollectionEncryptionPolicy = new cdk.aws_opensearchserverless.CfnSecurityPolicy( this, "KnowledgeBaseCollectionEncryptionPolicy", { name: "knowledge-base-encryption-policy", type: "encryption", policy: JSON.stringify({ Rules: [ { ResourceType: "collection", Resource: [`collection/${knowledgeBaseCollection.name}`], }, ], AWSOwnedKey: true, }), }, ); // NOTE: コレクションを作成する前に、コレクションの名前と一致するリソースパターンを含む暗号化ポリシーを作成しておく必要がある // 暗号化ポリシー作成後にコレクションが作成されるように依存関係を設定する // @see https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create // @see https://docs.aws.amazon.com/ja_jp/opensearch-service/latest/developerguide/serverless-encryption.html knowledgeBaseCollection.addDependency( knowledgeBaseCollectionEncryptionPolicy, ); new cdk.aws_opensearchserverless.CfnSecurityPolicy( this, "KnowledgeBaseCollectionNetworkPolicy", { name: "knowledge-base-network-policy", type: "network", policy: JSON.stringify([ { Rules: [ { ResourceType: "collection", Resource: [`collection/${knowledgeBaseCollection.name}`], }, { ResourceType: "dashboard", Resource: [`collection/${knowledgeBaseCollection.name}`], }, ], AllowFromPublic: true, }, ]), }, ); const serverServiceInstanceRole = new cdk.aws_iam.Role( this, "ServerServiceInstanceRole", { assumedBy: new cdk.aws_iam.ServicePrincipal( "tasks.apprunner.amazonaws.com", ), }, ); const aossIndexingPrincipalArn = cdk.aws_ssm.StringParameter.valueForStringParameter( this, "AossIndexingPrincipalArn", ); new cdk.aws_opensearchserverless.CfnAccessPolicy( this, "KnowledgeBaseCollectionAccessPolicy", { name: "knowledge-base-access-policy", type: "data", policy: JSON.stringify([ { Rules: [ { ResourceType: "collection", Resource: [`collection/${knowledgeBaseCollection.name}`], Permission: ["aoss:*"], }, { ResourceType: "index", Resource: [`index/${knowledgeBaseCollection.name}/*`], Permission: ["aoss:*"], }, ], Principal: [ serverServiceInstanceRole.roleArn, aossIndexingPrincipalArn, // インデックス操作を行うプリンシパルのARN ], }, ]), }, ); const aossIndexName = cdk.aws_ssm.StringParameter.valueForStringParameter( this, "AossIndexName", ); const ragEnabled = cdk.aws_ssm.StringParameter.valueForStringParameter( this, "RagEnabled", ); const slackBotToken = cdk.aws_ssm.StringParameter.valueForStringParameter( this, "SlackBotToken", ); const slackSignSecret = cdk.aws_ssm.StringParameter.valueForStringParameter( this, "SlackSignSecret", ); const serverService = new apprunner.Service(this, "ServerService", { source: apprunner.Source.fromAsset({ imageConfiguration: { port: 3000, environmentVariables: { AOSS_ENDPOINT_URL: knowledgeBaseCollection.attrCollectionEndpoint, AOSS_INDEX_NAME: aossIndexName, RAG_ENABLED: ragEnabled, SLACK_BOT_TOKEN: slackBotToken, SLACK_SIGNING_SECRET: slackSignSecret, }, }, asset: new cdk.aws_ecr_assets.DockerImageAsset(this, "ImageAsset", { directory: "../server", platform: cdk.aws_ecr_assets.Platform.LINUX_AMD64, }), }), cpu: apprunner.Cpu.ONE_VCPU, memory: apprunner.Memory.TWO_GB, healthCheck: apprunner.HealthCheck.tcp({}), instanceRole: serverServiceInstanceRole, autoDeploymentsEnabled: true, }); serverService.addToRolePolicy( new cdk.aws_iam.PolicyStatement({ effect: cdk.aws_iam.Effect.ALLOW, actions: [ "bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream", "aoss:*", ], resources: ["*"], }), ); } }
OpenSearch Serverlessコレクションの作成に関しては以下のブログで詳しく説明しています。
OpenSearch ServerlessをCDKで構築してみた | DevelopersIO
このCDKスタックをデプロイするには以下のSSMパラメーターが必要なので、デプロイ前にパラメーターを作成しておいてください。
AossIndexingPrincipalArn
: OpenSearch Serverlessコレクションのインデックス操作を行うプリンシパルのARN(インデックス操作に関しては後述)AossIndexName
: RAGで参照するOpenSearch Serverlessコレクションのインデックス名RagEnabled
: RAGを有効化するかどうかを表すフラグ(true
orfalse
を設定する)SlackBotToken
: Bot User OAuth Tokenの値SlackSignSecret
: Signing Secretの値
デプロイが完了したら、Slack AppのEvent Subscriptions > Enable EventsのRequest URLのドメインをApp Runnerサービスのデフォルトドメインに変更してください。 このURLのパス部分は/slack/events
を指定する必要がある点にご注意ください。
ドキュメントをインデックスする
RAGで参照するドキュメントをOpenSearch Serverlessコレクションのインデックスに追加するために以下のスクリプトを作成しました。
import os import boto3 from dotenv import load_dotenv from langchain_aws import BedrockEmbeddings from langchain_community.document_loaders import WebBaseLoader from langchain_community.vectorstores import OpenSearchVectorSearch from langchain_text_splitters import RecursiveCharacterTextSplitter from opensearchpy import RequestsHttpConnection from requests_aws4auth import AWS4Auth # type: ignore load_dotenv() print("Loading started...") loader = WebBaseLoader( web_paths=["https://classmethod.jp/services/generative-ai/ai-starter/"] ) docs = loader.load() print("Loading completed!") print("Splitting started...") text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) splits = text_splitter.split_documents(docs) embedding = BedrockEmbeddings( model_id="amazon.titan-embed-text-v2:0", region_name="us-east-1", client=None ) print("Splitting completed!") print("Indexing started...") credentials = boto3.Session().get_credentials() aws_auth = AWS4Auth( refreshable_credentials=credentials, region="ap-northeast-1", service="aoss", ) vectorstore = OpenSearchVectorSearch.from_documents( documents=splits, embedding=embedding, opensearch_url=os.environ["AOSS_ENDPOINT_URL"], index_name=os.environ["AOSS_INDEX_NAME"], http_auth=aws_auth, timeout=300, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, engine="faiss", ) print("Indexing completed!")
このスクリプトの処理概要は次のとおりです。
- 生成AI環境構築サービス「AI-Starter」 | サービス | クラスメソッド株式会社 のページ内容を取得する
- 1.の内容をチャンク分割する
- 2.の埋め込みを作成する
- 3.をOpenSearch Serverlessコレクションのインデックスに追加する
このスクリプトでは環境変数を読み込むため、スクリプトと同一ディレクトリに.env
という名前で以下のようなファイルを作成しておいてください。
AOSS_ENDPOINT_URL='https://xxxxxxxxxxxxxxxxxxxx.ap-northeast-1.aoss.amazonaws.com' # OpenSearch ServerlessコレクションのエンドポイントURL AOSS_INDEX_NAME='knowledge-base-index-name' # OpenSearch Serverlessコレクションのインデックス名
SSMパラメーター AossIndexingPrincipalArn
で指定したプリンシパルの認証情報をセット(IAMロールをAssume Roleするなど)した状態で、以下のようにスクリプトを実行することでドキュメント内容がインデックスされます。
環境変数 AOSS_INDEX_NAME
で指定したインデックスが未作成の場合は自動的に作成されます。
❯ poetry -C server run python server/scripts/index_documents.py Loading started... Loading completed! Splitting started... Splitting completed! Indexing started... Indexing completed!
動かしてみる
さて、これで準備が整ったので動かしてみましょう。
まずは環境変数 RAG_ENABLED
に false
を設定して、素のLLMに質問してみます。
AI-Sarter が何か聞いてみたのですが、AI学習プラットフォームとの回答が返ってきました。 これはハルシネーションです。
次に、環境変数 RAG_ENABLED
に true
を設定して、RAGを利用してみます。
先ほどと同じ質問をしてみたところ、今度は正しくAI-Starterについて説明してくれました!
つまずきポイント
LangChainを実行した際に以下のようなエラーが発生しました。
TypeError: ForwardRef._evaluate() missing 1 required keyword-only argument: 'recursive_guard'
どうやら、LangChainが内部で利用している Pydantic がPython v3.12.4で行われた変更に対して追従できてないことが原因のようでした。
Python v3.12.3を利用すればこのエラーを回避できるとのことだったので、Dockerfileのベースイメージを以下のように変更することでエラーが解消しました。
@@ -1,4 +1,6 @@ -FROM python:3.12-slim +# NOTE: Python v3.12.4を利用するとlangchainをインポートする際にエラーが発生するため、v3.12.3を利用する +# ref: https://stackoverflow.com/questions/78593700/langchain-community-langchain-packages-giving-error-missing-1-required-keywor +FROM python:3.12.3-slim WORKDIR /app